//! This module defines the [`CanisterHttpPoolManagerImpl`], which is an object
//! responsible for managing the flow of requests from execution to the
//! networking component, and ensuring that the resulting responses are signed
//! and eventually make it into consensus.
use crate::metrics::CanisterHttpPoolManagerMetrics;
use ic_consensus_utils::{
    crypto::ConsensusCrypto, is_current_protocol_version, membership::Membership,
    registry_version_at_height,
};
use ic_interfaces::{
    canister_http::*, consensus_pool::ConsensusPoolCache, p2p::consensus::PoolMutationsProducer,
};
use ic_interfaces_adapter_client::*;
use ic_interfaces_registry::RegistryClient;
use ic_interfaces_state_manager::StateReader;
use ic_logger::*;
use ic_metrics::MetricsRegistry;
use ic_registry_client_helpers::api_boundary_node::ApiBoundaryNodeRegistry;
use ic_registry_client_helpers::node::NodeRegistry;
use ic_registry_client_helpers::subnet::SubnetRegistry;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::ReplicatedState;
use ic_types::{
    Height, NumBytes, ReplicaVersion, canister_http::*, consensus::HasHeight, crypto::Signed,
    messages::CallbackId, replica_config::ReplicaConfig,
};
use ic_utils::str::StrEllipsize;
use std::{
    cell::RefCell,
    collections::{BTreeSet, HashSet},
    convert::TryInto,
    sync::{Arc, Mutex},
    time::Duration,
};

pub type CanisterHttpAdapterClient =
    Box<dyn NonBlockingChannel<CanisterHttpRequest, Response = CanisterHttpResponse> + Send>;

/// [`CanisterHttpPoolManagerImpl`] implements the pool and state monitoring
/// functionality that is necessary to ensure that http requests are made and
/// responses can be inserted into consensus. Concretely, it has the following responsibilities:
/// - It must decide when to trigger purging by noticing when consensus time changes
/// - Inform the HttpAdapterShim to make a request when new requests appear in the replicated state
/// - Sign response shares once a request is made
/// - Validate shares in the unvalidated pool that were received from gossip
pub struct CanisterHttpPoolManagerImpl {
    registry_client: Arc<dyn RegistryClient>,
    state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
    http_adapter_shim: Arc<Mutex<CanisterHttpAdapterClient>>,
    consensus_pool_cache: Arc<dyn ConsensusPoolCache>,
    crypto: Arc<dyn ConsensusCrypto>,
    membership: Arc<Membership>,
    replica_config: ReplicaConfig,
    subnet_type: SubnetType,
    requested_id_cache: RefCell<BTreeSet<CallbackId>>,
    metrics: CanisterHttpPoolManagerMetrics,
    log: ReplicaLogger,
}

const MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES: usize = 1024; // 1KB
const CANDID_OVERHEAD_RESERVE_BYTES: u64 = 1024; // 1KB

// Checks that the response size is within the allowed limits.
// In general, the response size must be lower than max_response_bytes, however there are to caveats:
//   1. if the response is an error (CanisterHttpResponseContent::Reject),
//      then the response size can be larger (generated by the replica)
//   2. the max_response_bytes is enforced before the candid encoding,
//      meaning that we need to reserve some bytes for the candid overhead.
// This check should be made on responses which consensus has not been reached on, such as the ones
// coming from non replicated requests.
fn validate_response_size(
    response: &CanisterHttpResponse,
    max_response_bytes: Option<NumBytes>,
) -> Result<(), String> {
    match &response.content {
        CanisterHttpResponseContent::Success(bytes) => {
            let response_size = bytes.len() as u64;
            let max_response_size = match max_response_bytes {
                Some(response_size) => response_size.get(),
                None => MAX_CANISTER_HTTP_RESPONSE_BYTES,
            };
            let max_response_size_including_candid_overhead =
                max_response_size + CANDID_OVERHEAD_RESERVE_BYTES;
            if response_size > max_response_size_including_candid_overhead {
                Err(format!(
                    "Response size {response_size} exceeds the maximum allowed size of {max_response_size_including_candid_overhead}"
                ))
            } else {
                Ok(())
            }
        }
        CanisterHttpResponseContent::Reject(reject) => {
            let response_size = reject.message.len();
            if response_size > MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES {
                Err(format!(
                    "Reject message size {response_size} exceeds the maximum allowed size of {MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES}"
                ))
            } else {
                Ok(())
            }
        }
    }
}

impl CanisterHttpPoolManagerImpl {
    /// Create a new [`CanisterHttpPoolManagerImpl`]
    pub fn new(
        state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
        http_adapter_shim: Arc<Mutex<CanisterHttpAdapterClient>>,
        crypto: Arc<dyn ConsensusCrypto>,
        consensus_pool_cache: Arc<dyn ConsensusPoolCache>,
        replica_config: ReplicaConfig,
        subnet_type: SubnetType,
        registry_client: Arc<dyn RegistryClient>,
        metrics_registry: MetricsRegistry,
        log: ReplicaLogger,
    ) -> Self {
        let membership = Arc::new(Membership::new(
            consensus_pool_cache.clone(),
            registry_client.clone(),
            replica_config.subnet_id,
        ));

        Self {
            state_reader,
            http_adapter_shim,
            crypto,
            replica_config,
            subnet_type,
            membership,
            consensus_pool_cache,
            registry_client,
            metrics: CanisterHttpPoolManagerMetrics::new(&metrics_registry),
            log,
            requested_id_cache: RefCell::new(BTreeSet::new()),
        }
    }

    /// Purge shares of responses for requests that have already been processed.
    fn purge_shares_of_processed_requests(
        &self,
        canister_http_pool: &dyn CanisterHttpPool,
    ) -> CanisterHttpChangeSet {
        let _time = self
            .metrics
            .op_duration
            .with_label_values(&["purge_shares"])
            .start_timer();

        let active_callback_ids = self.active_callback_ids();
        let next_callback_id = self.next_callback_id();

        let ids_to_remove_from_cache: Vec<_> = self
            .requested_id_cache
            .borrow()
            .difference(&active_callback_ids)
            .cloned()
            .collect();

        for callback_id in ids_to_remove_from_cache.iter() {
            self.requested_id_cache.borrow_mut().remove(callback_id);
        }

        canister_http_pool
            .get_validated_shares()
            .filter_map(|share| {
                if active_callback_ids.contains(&share.content.id) {
                    None
                } else {
                    Some(CanisterHttpChangeAction::RemoveValidated(share.clone()))
                }
            })
            .chain(
                canister_http_pool
                    .get_unvalidated_artifacts()
                    // Only check the unvalidated shares belonging to the requests that we can validate.
                    .filter(|artifact| artifact.share.content.id < next_callback_id)
                    .filter_map(|artifact| {
                        let share = &artifact.share;
                        if active_callback_ids.contains(&share.content.id) {
                            None
                        } else {
                            Some(CanisterHttpChangeAction::RemoveUnvalidated(share.clone()))
                        }
                    }),
            )
            .chain(
                canister_http_pool
                    .get_response_content_items()
                    .filter_map(|content| {
                        if active_callback_ids.contains(&content.1.id) {
                            None
                        } else {
                            Some(CanisterHttpChangeAction::RemoveContent(content.0.clone()))
                        }
                    }),
            )
            .collect()
    }

    fn get_socks_proxy_addrs(&self) -> Vec<String> {
        let latest_registry_version = self.registry_client.get_latest_version();

        let allowed_boundary_nodes = match self.subnet_type {
            SubnetType::System => self
                .registry_client
                .get_system_api_boundary_node_ids(latest_registry_version),
            SubnetType::Application | SubnetType::VerifiedApplication => self
                .registry_client
                .get_app_api_boundary_node_ids(latest_registry_version),
        };

        allowed_boundary_nodes
            .unwrap_or_else(|e| {
                warn!(self.log, "Failed to get API boundary node IDs: {:?}", e);
                Vec::new()
            })
            .into_iter()
            .filter_map(|id| {
                self.registry_client
                    .get_node_record(id, latest_registry_version)
                    .map_err(|e| {
                        warn!(
                            self.log,
                            "Failed to get node record for node ID {:?}: {:?}", id, e
                        );
                    })
                    .ok()
                    .and_then(|opt_record| {
                        opt_record.or_else(|| {
                            warn!(self.log, "No node record found for node ID {:?}", id);
                            None
                        })
                    })
                    .and_then(|record| {
                        record.http.or_else(|| {
                            warn!(self.log, "HTTP information missing for node ID {:?}", id);
                            None
                        })
                    })
                    .map(|http_info| format!("socks5h://[{0}]:1080", http_info.ip_addr))
            })
            .collect::<Vec<String>>()
    }

    /// Inform the HttpAdapterShim of any new requests that must be made.
    fn make_new_requests(&self, canister_http_pool: &dyn CanisterHttpPool) {
        let _time = self
            .metrics
            .op_duration
            .with_label_values(&["make_new_requests"])
            .start_timer();

        let http_requests = &self
            .latest_state()
            .metadata
            .subnet_call_context_manager
            .canister_http_request_contexts;

        self.metrics
            .in_flight_requests
            .set(http_requests.len().try_into().unwrap());

        let request_ids_in_pool: BTreeSet<_> = canister_http_pool
            .get_validated_shares()
            .filter_map(|share| {
                if share.signature.signer == self.replica_config.node_id {
                    Some(share.content.id)
                } else {
                    None
                }
            })
            .collect();

        let request_ids_already_made: BTreeSet<_> = request_ids_in_pool
            .union(&self.requested_id_cache.borrow())
            .cloned()
            .collect();

        let socks_proxy_addrs = self.get_socks_proxy_addrs();

        for (id, context) in http_requests {
            if let Replication::NonReplicated(delegated_node_id) = context.replication
                && delegated_node_id != self.replica_config.node_id
            {
                // If the request is delegated to another node, we do not make a request.
                // The delegated node will handle it.
                continue;
            }

            if !request_ids_already_made.contains(id) {
                let timeout = context.time + Duration::from_secs(5 * 60);
                if let Err(err) = self
                    .http_adapter_shim
                    .lock()
                    .unwrap()
                    .send(CanisterHttpRequest {
                        id: *id,
                        timeout,
                        context: context.clone(),
                        socks_proxy_addrs: socks_proxy_addrs.clone(),
                    })
                {
                    warn!(
                        self.log,
                        "Failed to add canister http request to queue {:?}", err
                    )
                } else {
                    self.requested_id_cache.borrow_mut().insert(*id);
                }
            }
        }
    }

    /// Create any shares that should be made from responses provided by the
    /// HttpAdapterShim.
    fn create_shares_from_responses(&self, finalized_height: Height) -> CanisterHttpChangeSet {
        let _time = self
            .metrics
            .op_duration
            .with_label_values(&["create_shares_from_responses"])
            .start_timer();
        let registry_version = if let Some(registry_version) =
            registry_version_at_height(self.consensus_pool_cache.as_ref(), finalized_height)
        {
            registry_version
        } else {
            error!(
                self.log,
                "Unable to obtain registry version for use for signing canister http responses",
            );
            return Vec::new();
        };
        let mut change_set = Vec::new();

        let active_contexts = &self
            .latest_state()
            .metadata
            .subnet_call_context_manager
            .canister_http_request_contexts;

        loop {
            match self.http_adapter_shim.lock().unwrap().try_receive() {
                Err(TryReceiveError::Empty) => break,
                Ok(mut response) => {
                    // Truncate the reject message if it's too long.
                    //
                    // The "happy path" response is organically bounded by max_response_bytes, however we need to set a
                    // limit for the error message as well.
                    //
                    // The current limit is 1KB, which should be reasonable for an error message.
                    if let CanisterHttpResponseContent::Reject(reject) = &mut response.content
                        && reject.message.len() > MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES
                    {
                        let original_len = reject.message.len();
                        warn!(
                            self.log,
                            "Pruning oversized reject message for request ID {}. Original size: {}, New size: {}",
                            response.id,
                            original_len,
                            MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES
                        );
                        reject.message = reject
                            .message
                            .ellipsize(MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES, 90);
                    }

                    let response_metadata = CanisterHttpResponseMetadata {
                        id: response.id,
                        timeout: response.timeout,
                        registry_version,
                        content_hash: ic_types::crypto::crypto_hash(&response),
                        replica_version: ReplicaVersion::default(),
                    };
                    let signature = if let Ok(signature) = self
                        .crypto
                        .sign(
                            &response_metadata,
                            self.replica_config.node_id,
                            registry_version,
                        )
                        .map_err(|err| error!(self.log, "Failed to sign http response {}", err))
                    {
                        signature
                    } else {
                        continue;
                    };
                    let share = Signed {
                        content: response_metadata,
                        signature,
                    };
                    self.requested_id_cache.borrow_mut().remove(&response.id);
                    self.metrics.shares_signed.inc();

                    if let Some(context) = active_contexts.get(&response.id)
                        && matches!(context.replication, Replication::NonReplicated(_))
                    {
                        if let Err(err) =
                            validate_response_size(&response, context.max_response_bytes)
                        {
                            warn!(
                                self.log,
                                "Http Response for request ID {} is too large: {}",
                                response.id,
                                err
                            );
                            continue;
                        }

                        change_set.push(CanisterHttpChangeAction::AddToValidatedAndGossipResponse(
                            share, response,
                        ));
                        continue;
                    }

                    //TODO(IC-1967): don't create this share if relevant context is no longer in the replicated state.
                    change_set.push(CanisterHttpChangeAction::AddToValidated(share, response));
                }
            }
        }
        change_set
    }

    /// Validate any shares found in the unvalidated section of the canister http pool.
    fn validate_shares(
        &self,
        consensus_cache: &dyn ConsensusPoolCache,
        canister_http_pool: &dyn CanisterHttpPool,
        finalized_height: Height,
    ) -> CanisterHttpChangeSet {
        let _time = self
            .metrics
            .op_duration
            .with_label_values(&["validate_shares"])
            .start_timer();
        let registry_version = if let Some(registry_version) =
            registry_version_at_height(consensus_cache, finalized_height)
        {
            registry_version
        } else {
            error!(
                self.log,
                "Unable to obtain registry version for use for signing canister http responses",
            );
            return Vec::new();
        };

        let active_contexts = &self
            .latest_state()
            .metadata
            .subnet_call_context_manager
            .canister_http_request_contexts;
        let next_callback_id = self.next_callback_id();

        let key_from_share =
            |share: &CanisterHttpResponseShare| (share.signature.signer, share.content.id);

        let mut existing_signed_requests: HashSet<_> = canister_http_pool
            .get_validated_shares()
            .map(key_from_share)
            .collect();

        canister_http_pool
            .get_unvalidated_artifacts()
            .filter(|artifact| artifact.share.content.id < next_callback_id)
            .filter_map(|artifact| {
                let share = &artifact.share;

                if existing_signed_requests.contains(&key_from_share(share)) {
                    return match is_current_protocol_version(&share.content.replica_version) {
                        true => Some(CanisterHttpChangeAction::HandleInvalid(
                            share.clone(),
                            "Redundant share".into(),
                        )),
                        false => Some(CanisterHttpChangeAction::RemoveUnvalidated(share.clone())),
                    };
                }

                let Some(context) = active_contexts.get(&share.content.id) else {
                    return Some(CanisterHttpChangeAction::RemoveUnvalidated(share.clone()));
                };

                match context.replication {
                    Replication::NonReplicated(node_id) => {
                        if node_id != share.signature.signer {
                            return Some(CanisterHttpChangeAction::HandleInvalid(
                                share.clone(),
                                "Share signed by node that is not the delegated node for the request".to_string(),
                            ));
                        }

                        let Some(response) = &artifact.response else {
                            // The request is not fully replicated, but the response is missing.
                            return Some(CanisterHttpChangeAction::HandleInvalid(share.clone(),
                                "Artifact should contain response".to_string()));
                        };

                        if share.content.content_hash != ic_types::crypto::crypto_hash(response) {
                            return Some(CanisterHttpChangeAction::HandleInvalid(
                                share.clone(),
                                "Content hash does not match the response".to_string(),
                            ));
                        }

                        //TODO(IC-1966): we should also check the response size when validating the block payload.

                        // An honest replica enforces that response.content.count_bytes() does not exceed max_response_bytes
                        // when the content is `Success`. However it doesn't enroce anything in the case of `Failure`.
                        // As we still want to set a limit for failure, we enforce 1KB, which is reasonable for
                        // an error message.

                        if let Err(err) = validate_response_size(response, context.max_response_bytes) {
                            return Some(CanisterHttpChangeAction::HandleInvalid(
                                share.clone(),
                                format!("Http Response for request ID {} is too large: {}", response.id, err),
                            ));
                        }
                    }
                    Replication::FullyReplicated => {
                        // Fully replicated requests must not have a response attached.
                        if artifact.response.is_some() {
                            return Some(CanisterHttpChangeAction::HandleInvalid(
                                share.clone(),
                                "Artifact should not contain response".to_string(),
                            ));
                        }
                    }
                }

                let node_is_in_committee = self
                    .membership
                    .node_belongs_to_canister_http_committee(
                        finalized_height,
                        share.signature.signer,
                    )
                    .map_err(|e| {
                        warn!(
                            self.log,
                            "Unable to check membership for share at height {}, {:?}",
                            finalized_height,
                            e
                        );
                        e
                    })
                    .ok()?;
                if !node_is_in_committee {
                    return Some(CanisterHttpChangeAction::HandleInvalid(
                        share.clone(),
                        "Share signed by node that is not a member of the canister http committee"
                            .to_string(),
                    ));
                }
                // TODO: more precise error handling
                if let Err(err) = self.crypto.verify(share, registry_version) {
                    error!(self.log, "Unable to verify signature of share, {}", err);

                    self.metrics.shares_marked_invalid.inc();
                    Some(CanisterHttpChangeAction::HandleInvalid(
                        share.clone(),
                        format!("Unable to verify signature of share, {err}"),
                    ))
                } else {
                    // Update the set of existing signed requests.
                    existing_signed_requests.insert(key_from_share(share));
                    self.metrics.shares_validated.inc();
                    Some(CanisterHttpChangeAction::MoveToValidated(share.clone()))
                }
            })
            .collect()
    }

    fn generate_change_set(
        &self,
        canister_http_pool: &dyn CanisterHttpPool,
    ) -> CanisterHttpChangeSet {
        let _time = self
            .metrics
            .op_duration
            .with_label_values(&["generate_change_set"])
            .start_timer();
        let mut change_set = Vec::new();

        // Whenever we have artifacts to purge, we insert the purge change actions before everything
        // else, to avoid having in the validated pool artifacts belonging to different epochs and
        // hence preserving the expected maximal number of artifacts in the pool.
        change_set.extend(self.purge_shares_of_processed_requests(canister_http_pool));

        let finalized_height = self.consensus_pool_cache.finalized_block().height();

        if self
            .membership
            .node_belongs_to_canister_http_committee(finalized_height, self.replica_config.node_id)
            .unwrap_or(false)
        {
            // Make any requests that need to be made
            self.make_new_requests(canister_http_pool);

            // Create shares from any responses that are now available
            change_set.extend(self.create_shares_from_responses(finalized_height));
        }

        // Attempt to validate unvalidated shares
        change_set.extend(self.validate_shares(
            self.consensus_pool_cache.as_ref(),
            canister_http_pool,
            finalized_height,
        ));

        self.metrics
            .in_client_requests
            .set(self.requested_id_cache.borrow().len().try_into().unwrap());

        change_set
    }

    fn active_callback_ids(&self) -> BTreeSet<CallbackId> {
        self.state_reader
            .get_latest_state()
            .get_ref()
            .metadata
            .subnet_call_context_manager
            .canister_http_request_contexts
            .keys()
            .copied()
            .collect()
    }

    fn latest_state(&self) -> Arc<ReplicatedState> {
        self.state_reader.get_latest_state().get_ref().clone()
    }

    fn next_callback_id(&self) -> CallbackId {
        self.state_reader
            .get_latest_state()
            .get_ref()
            .metadata
            .subnet_call_context_manager
            .next_callback_id()
    }
}

impl<T: CanisterHttpPool> PoolMutationsProducer<T> for CanisterHttpPoolManagerImpl {
    type Mutations = CanisterHttpChangeSet;

    fn on_state_change(&self, canister_http_pool: &T) -> CanisterHttpChangeSet {
        if let Ok(subnet_features) = self.registry_client.get_features(
            self.replica_config.subnet_id,
            self.registry_client.get_latest_version(),
        ) && subnet_features.unwrap_or_default().http_requests
        {
            return self.generate_change_set(canister_http_pool);
        }
        vec![]
    }
}

#[cfg(test)]
pub mod test {
    use super::*;
    use assert_matches::assert_matches;
    use ic_artifact_pool::canister_http_pool::CanisterHttpPoolImpl;
    use ic_consensus_mocks::{Dependencies, dependencies};
    use ic_consensus_utils::crypto::SignVerify;
    use ic_error_types::RejectCode;
    use ic_interfaces::p2p::consensus::{MutablePool, UnvalidatedArtifact};
    use ic_interfaces_state_manager::Labeled;
    use ic_logger::replica_logger::no_op_logger;
    use ic_metrics::MetricsRegistry;
    use ic_replicated_state::metadata_state::subnet_call_context_manager::SubnetCallContext;
    use ic_test_utilities_logger::with_test_replica_logger;
    use ic_test_utilities_types::ids::subnet_test_id;
    use ic_types::{
        Height, NumBytes, RegistryVersion, Time,
        crypto::{CryptoHash, CryptoHashOf},
        messages::CallbackId,
        time::UNIX_EPOCH,
    };
    use mockall::predicate::*;
    use mockall::*;
    use std::{collections::BTreeMap, str::FromStr};

    mock! {
        pub NonBlockingChannel<Request: 'static> {
        }

        impl<Request> NonBlockingChannel<Request> for NonBlockingChannel<Request> {
            type Response = CanisterHttpResponse;

            fn send(&self, request: Request) -> Result<(), SendError<Request>>;
            fn try_receive(&mut self) -> Result<CanisterHttpResponse, TryReceiveError>;
        }
    }

    fn state_with_pending_http_calls(
        http_calls: BTreeMap<CallbackId, CanisterHttpRequestContext>,
    ) -> ReplicatedState {
        // Add some pending http calls
        let mut replicated_state = ReplicatedState::new(subnet_test_id(0), SubnetType::System);
        // This will increase the next_call_id to 1
        if let Some(val) = http_calls.values().next() {
            replicated_state
                .metadata
                .subnet_call_context_manager
                .push_context(SubnetCallContext::CanisterHttpRequest(val.clone()));
        }
        replicated_state
            .metadata
            .subnet_call_context_manager
            .canister_http_request_contexts = http_calls;
        replicated_state
    }

    fn empty_canister_http_response(id: u64) -> CanisterHttpResponse {
        CanisterHttpResponse {
            id: CallbackId::from(id),
            canister_id: ic_types::CanisterId::from(0),
            timeout: Time::from_nanos_since_unix_epoch(0),
            content: CanisterHttpResponseContent::Success(Vec::new()),
        }
    }

    #[test]
    pub fn test_validation_of_shares_above_known_requests() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let request = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::FullyReplicated,
                    pricing_version: PricingVersion::Legacy,
                };

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(0),
                            request,
                        )]))),
                    ));

                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                // Try to insert a share for request id 1 (while the next expected one is the
                // default value 0).
                {
                    let response_metadata = CanisterHttpResponseMetadata {
                        id: CallbackId::from(1),
                        timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                        registry_version: RegistryVersion::from(1),
                        content_hash: CryptoHashOf::new(CryptoHash(vec![])),
                        replica_version: ReplicaVersion::default(),
                    };

                    let signature = crypto
                        .sign(
                            &response_metadata,
                            replica_config.node_id,
                            RegistryVersion::from(1),
                        )
                        .unwrap();

                    let share = Signed {
                        content: response_metadata.clone(),
                        signature,
                    };

                    let artifact = CanisterHttpResponseArtifact {
                        share,
                        response: None,
                    };

                    canister_http_pool.insert(UnvalidatedArtifact {
                        message: artifact,
                        peer_id: replica_config.node_id,
                        timestamp: UNIX_EPOCH,
                    });
                }

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager as Arc<_>,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                let changes = pool_manager.validate_shares(
                    pool.get_cache().as_ref(),
                    &canister_http_pool,
                    Height::from(0),
                );

                // Make sure the changes are empty (share was filtered out)
                assert!(changes.is_empty());
            })
        });
    }

    #[test]
    fn test_invalidation_of_invalid_version() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let request = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::FullyReplicated,
                    pricing_version: PricingVersion::Legacy,
                };

                // NOTE: We need at least some context in the state, otherwise next_callback_id will be 0 and no
                // artifacts can have a smaller callback_id and be valid
                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(0),
                            request,
                        )]))),
                    ));

                let response_metadata = CanisterHttpResponseMetadata {
                    id: CallbackId::from(0),
                    timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                    registry_version: RegistryVersion::from(1),
                    content_hash: CryptoHashOf::new(CryptoHash(vec![])),
                    replica_version: ReplicaVersion::default(),
                };

                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                let signature = crypto
                    .sign(
                        &response_metadata,
                        replica_config.node_id,
                        RegistryVersion::from(1),
                    )
                    .unwrap();

                let mut share = Signed {
                    content: response_metadata.clone(),
                    signature,
                };

                // add a share (plus content) to the validated pool
                canister_http_pool.apply(vec![CanisterHttpChangeAction::AddToValidated(
                    share.clone(),
                    empty_canister_http_response(7),
                )]);

                // add an unvalidated copy of the share, that has an outdated version instead
                share.content.replica_version =
                    ReplicaVersion::from_str("outdated_version").unwrap();

                let artifact = CanisterHttpResponseArtifact {
                    share,
                    response: None,
                };

                canister_http_pool.insert(UnvalidatedArtifact {
                    message: artifact,
                    peer_id: replica_config.node_id,
                    timestamp: UNIX_EPOCH,
                });

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager as Arc<_>,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                let changes = pool_manager.validate_shares(
                    pool.get_cache().as_ref(),
                    &canister_http_pool,
                    Height::from(0),
                );

                assert_matches!(&changes[0], CanisterHttpChangeAction::RemoveUnvalidated(_));
            })
        });
    }

    #[test]
    pub fn test_invalidation_of_redundant_shares() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let request = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::FullyReplicated,
                    pricing_version: PricingVersion::Legacy,
                };

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(0),
                            request,
                        )]))),
                    ));

                let response_metadata = CanisterHttpResponseMetadata {
                    id: CallbackId::from(0),
                    timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                    registry_version: RegistryVersion::from(1),
                    content_hash: CryptoHashOf::new(CryptoHash(vec![])),
                    replica_version: ReplicaVersion::default(),
                };

                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                // Insert the first share as validated.
                {
                    let signature = crypto
                        .sign(
                            &response_metadata,
                            replica_config.node_id,
                            RegistryVersion::from(1),
                        )
                        .unwrap();

                    let share = Signed {
                        content: response_metadata.clone(),
                        signature,
                    };

                    let content = empty_canister_http_response(7);
                    canister_http_pool.apply(vec![CanisterHttpChangeAction::AddToValidated(
                        share, content,
                    )]);
                }

                // Insert the second share as unvalidated.
                {
                    let signature = crypto
                        .sign(
                            &response_metadata,
                            replica_config.node_id,
                            RegistryVersion::from(1),
                        )
                        .unwrap();

                    let share = Signed {
                        content: response_metadata.clone(),
                        signature,
                    };

                    let artifact = CanisterHttpResponseArtifact {
                        share,
                        response: None,
                    };

                    canister_http_pool.insert(UnvalidatedArtifact {
                        message: artifact,
                        peer_id: replica_config.node_id,
                        timestamp: UNIX_EPOCH,
                    });
                }

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager as Arc<_>,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                let changes = pool_manager.validate_shares(
                    pool.get_cache().as_ref(),
                    &canister_http_pool,
                    Height::from(0),
                );

                // Make sure the second share is sorted out as invalid, for the right reason.
                if let CanisterHttpChangeAction::HandleInvalid(_, err) = &changes[0] {
                    assert_eq!(err, "Redundant share");
                } else {
                    panic!("unexpected change action");
                }
            })
        });
    }

    #[test]
    fn test_non_replicated_share_validation_logic() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);

                let request = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(0),
                            request,
                        )]))),
                    ));

                let response = empty_canister_http_response(0);
                let response_metadata = CanisterHttpResponseMetadata {
                    id: CallbackId::from(0),
                    timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                    registry_version: RegistryVersion::from(1),
                    content_hash: ic_types::crypto::crypto_hash(&response),
                    replica_version: ReplicaVersion::default(),
                };

                let signature = crypto
                    .sign(
                        &response_metadata,
                        delegated_node_id,
                        RegistryVersion::from(1),
                    )
                    .unwrap();

                let share = Signed {
                    content: response_metadata.clone(),
                    signature,
                };

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager.clone(),
                    shim.clone(),
                    crypto.clone(),
                    pool.get_cache(),
                    replica_config.clone(),
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log.clone(),
                );

                // TEST 1: Non-replicated request artifact is missing the response.
                // It should be marked as invalid.
                {
                    let mut canister_http_pool =
                        CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());
                    let artifact_without_response = CanisterHttpResponseArtifact {
                        share: share.clone(),
                        response: None, // Missing response
                    };
                    canister_http_pool.insert(UnvalidatedArtifact {
                        message: artifact_without_response,
                        peer_id: delegated_node_id,
                        timestamp: UNIX_EPOCH,
                    });

                    let changes = pool_manager.validate_shares(
                        pool.get_cache().as_ref(),
                        &canister_http_pool,
                        Height::from(0),
                    );

                    assert_matches!(&changes[0], CanisterHttpChangeAction::HandleInvalid(_, reason) if reason == "Artifact should contain response");
                }

                // TEST 2: Non-replicated request artifact has a mismatched content hash.
                // It should be marked as invalid.
                {
                    let mut canister_http_pool =
                        CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                    let mut bad_share = share.clone();
                    bad_share.content.content_hash = CryptoHashOf::new(CryptoHash(vec![1, 2, 3]));

                    let artifact_with_mismatched_hash = CanisterHttpResponseArtifact {
                        share: bad_share,
                        response: Some(response),
                    };
                    canister_http_pool.insert(UnvalidatedArtifact {
                        message: artifact_with_mismatched_hash,
                        peer_id: delegated_node_id,
                        timestamp: UNIX_EPOCH,
                    });

                    let changes = pool_manager.validate_shares(
                        pool.get_cache().as_ref(),
                        &canister_http_pool,
                        Height::from(0),
                    );

                    assert_matches!(
                        &changes[0],
                        CanisterHttpChangeAction::HandleInvalid(_, reason) if reason == "Content hash does not match the response"
                    );
                }
            })
        });
    }

    #[test]
    fn test_non_replicated_share_from_wrong_signer_is_invalid() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                // 1. SETUP: Create dependencies for a subnet with at least 3 nodes.
                let Dependencies {
                    pool,
                    replica_config, // Our node, ID 0
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 4);

                // Define the delegated node and a different, incorrect signer.
                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);
                let wrong_signer_id = ic_test_utilities_types::ids::node_test_id(2);
                let callback_id = CallbackId::from(0);

                // 2. CONTEXT: The request is explicitly delegated to `delegated_node_id`.
                let request_context = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };
                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            callback_id,
                            request_context,
                        )]))),
                    ));

                // 3. MALICIOUS ARTIFACT: Create a share that is signed by the `wrong_signer_id`.
                let response = empty_canister_http_response(callback_id.get());
                let response_metadata = CanisterHttpResponseMetadata {
                    id: callback_id,
                    timeout: response.timeout,
                    registry_version: RegistryVersion::from(1),
                    content_hash: ic_types::crypto::crypto_hash(&response),
                    replica_version: ReplicaVersion::default(),
                };
                let share = Signed {
                    content: response_metadata.clone(),
                    // The signature is created by the WRONG node.
                    signature: crypto
                        .sign(
                            &response_metadata,
                            wrong_signer_id,
                            RegistryVersion::from(1),
                        )
                        .unwrap(),
                };

                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());
                canister_http_pool.insert(UnvalidatedArtifact {
                    message: CanisterHttpResponseArtifact {
                        share,
                        response: Some(response),
                    },
                    peer_id: wrong_signer_id, // The artifact comes from the wrong signer.
                    timestamp: UNIX_EPOCH,
                });

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager,
                    Arc::new(Mutex::new(Box::new(MockNonBlockingChannel::new()))),
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // 4. ACTION: Our replica attempts to validate the artifact.
                let change_set = pool_manager.validate_shares(
                    pool.get_cache().as_ref(),
                    &canister_http_pool,
                    Height::from(1),
                );

                // 5. ASSERTION: The artifact must be invalidated with the specific reason.
                assert_eq!(change_set.len(), 1, "Expected exactly one change action");
                assert_matches!(
                    &change_set[0],
                    CanisterHttpChangeAction::HandleInvalid(_, reason) => {
                        assert_eq!(reason, "Share signed by node that is not the delegated node for the request");
                    }
                );
            })
        });
    }

    #[test]
    fn test_fully_replicated_share_with_response_is_invalid() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                // This request is fully replicated across the committee.
                let request = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::FullyReplicated,
                    pricing_version: PricingVersion::Legacy,
                };

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(0),
                            request,
                        )]))),
                    ));

                let response = empty_canister_http_response(0);
                let response_metadata = CanisterHttpResponseMetadata {
                    id: CallbackId::from(0),
                    timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                    registry_version: RegistryVersion::from(1),
                    content_hash: ic_types::crypto::crypto_hash(&response),
                    replica_version: ReplicaVersion::default(),
                };

                let signature = crypto
                    .sign(
                        &response_metadata,
                        replica_config.node_id,
                        RegistryVersion::from(1),
                    )
                    .unwrap();

                let share = Signed {
                    content: response_metadata.clone(),
                    signature,
                };

                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                // Create an artifact that incorrectly includes a response for a fully replicated request.
                let artifact_with_response = CanisterHttpResponseArtifact {
                    share,
                    response: Some(response), // This should NOT be here for a fully replicated request
                };

                canister_http_pool.insert(UnvalidatedArtifact {
                    message: artifact_with_response,
                    peer_id: replica_config.node_id,
                    timestamp: UNIX_EPOCH,
                });

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                let changes = pool_manager.validate_shares(
                    pool.get_cache().as_ref(),
                    &canister_http_pool,
                    Height::from(0),
                );

                // The change action should be HandleInvalid because a fully replicated request's
                // artifact must not contain a response in the unvalidated pool.
                assert_matches!(
                    &changes[0],
                    CanisterHttpChangeAction::HandleInvalid(_, reason) if reason == "Artifact should not contain response"
                );
            })
        });
    }

    #[test]
    fn test_non_replicated_share_response_size_validation() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);
                let max_response_bytes = NumBytes::from(2000);

                // 1. Set up a state context with a specific max_response_bytes limit.
                let request_context = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: Some(max_response_bytes),
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(0),
                            request_context,
                        )]))),
                    ));

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager.clone(),
                    shim,
                    crypto.clone(),
                    pool.get_cache(),
                    replica_config.clone(),
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // SCENARIO A: Response size is LARGER than the limit.
                // It should be marked as invalid.
                {
                    let mut canister_http_pool =
                        CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                    // Create a response with content that is one byte too large,
                    // accounting for the Candid overhead.
                    let oversized_len =
                        (max_response_bytes.get() + CANDID_OVERHEAD_RESERVE_BYTES + 1) as usize;
                    let response_body_too_large = vec![0; oversized_len];
                    let response = CanisterHttpResponse {
                        id: CallbackId::from(0),
                        canister_id: ic_types::CanisterId::from(0),
                        timeout: Time::from_nanos_since_unix_epoch(0),
                        content: CanisterHttpResponseContent::Success(response_body_too_large),
                    };

                    let response_metadata = CanisterHttpResponseMetadata {
                        id: CallbackId::from(0),
                        timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                        registry_version: RegistryVersion::from(1),
                        content_hash: ic_types::crypto::crypto_hash(&response),
                        replica_version: ReplicaVersion::default(),
                    };
                    let share = Signed {
                        content: response_metadata.clone(),
                        signature: crypto
                            .sign(
                                &response_metadata,
                                delegated_node_id,
                                RegistryVersion::from(1),
                            )
                            .unwrap(),
                    };
                    canister_http_pool.insert(UnvalidatedArtifact {
                        message: CanisterHttpResponseArtifact {
                            share,
                            response: Some(response),
                        },
                        peer_id: delegated_node_id,
                        timestamp: UNIX_EPOCH,
                    });

                    let changes = pool_manager.validate_shares(
                        pool.get_cache().as_ref(),
                        &canister_http_pool,
                        Height::from(0),
                    );

                    // The error from `validate_response_size` itself.
                    let validation_err = format!(
                        "Response size {} exceeds the maximum allowed size of {}",
                        oversized_len,
                        max_response_bytes.get() + CANDID_OVERHEAD_RESERVE_BYTES
                    );
                    // The full error message produced by the `validate_shares` function.
                    let expected_err = format!(
                        "Http Response for request ID {} is too large: {}",
                        CallbackId::from(0),
                        validation_err
                    );

                    assert_matches!(
                        &changes[0],
                        CanisterHttpChangeAction::HandleInvalid(_, reason) if reason == &expected_err
                    );
                }

                // SCENARIO B: Response size is EXACTLY the limit.
                // It should be successfully validated.
                {
                    let mut canister_http_pool =
                        CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                    // Create a response with content that is exactly the max size.
                    let response_body_ok = vec![0; max_response_bytes.get() as usize];
                    let response = CanisterHttpResponse {
                        id: CallbackId::from(0),
                        canister_id: ic_types::CanisterId::from(0),
                        timeout: Time::from_nanos_since_unix_epoch(0),
                        content: CanisterHttpResponseContent::Success(response_body_ok),
                    };

                    let response_metadata = CanisterHttpResponseMetadata {
                        id: CallbackId::from(0),
                        timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                        registry_version: RegistryVersion::from(1),
                        content_hash: ic_types::crypto::crypto_hash(&response),
                        replica_version: ReplicaVersion::default(),
                    };
                    let share = Signed {
                        content: response_metadata.clone(),
                        signature: crypto
                            .sign(
                                &response_metadata,
                                delegated_node_id,
                                RegistryVersion::from(1),
                            )
                            .unwrap(),
                    };
                    canister_http_pool.insert(UnvalidatedArtifact {
                        message: CanisterHttpResponseArtifact {
                            share,
                            response: Some(response),
                        },
                        peer_id: delegated_node_id,
                        timestamp: UNIX_EPOCH,
                    });

                    let changes = pool_manager.validate_shares(
                        pool.get_cache().as_ref(),
                        &canister_http_pool,
                        Height::from(0),
                    );

                    assert_matches!(&changes[0], CanisterHttpChangeAction::MoveToValidated(_));
                }
            })
        });
    }

    #[test]
    fn test_reject_message_valid_when_max_response_is_zero() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                // 1. SETUP: Standard dependencies and a mock for the adapter.
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);

                // 2. CONTEXT: Create a request context where max_response_bytes is 0.
                let request_context = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: Some(NumBytes::from(0)), // Set to zero
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(0),
                            request_context,
                        )]))),
                    ));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager.clone(),
                    Arc::new(Mutex::new(Box::new(shim_mock))),
                    crypto.clone(),
                    pool.get_cache(),
                    replica_config.clone(),
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // 3. ARTIFACT: Create a Reject response. Its message size is valid
                //    (i.e., less than MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES), so it should pass
                //    validation despite the context's zero-byte limit for success responses.
                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                let response = CanisterHttpResponse {
                    id: CallbackId::from(0),
                    content: CanisterHttpResponseContent::Reject(CanisterHttpReject {
                        reject_code: RejectCode::SysTransient,
                        message: "A transient error occurred.".to_string(),
                    }),
                    ..empty_canister_http_response(0)
                };

                let response_metadata = CanisterHttpResponseMetadata {
                    id: CallbackId::from(0),
                    timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                    registry_version: RegistryVersion::from(1),
                    content_hash: ic_types::crypto::crypto_hash(&response),
                    replica_version: ReplicaVersion::default(),
                };

                let share = Signed {
                    content: response_metadata.clone(),
                    signature: crypto
                        .sign(
                            &response_metadata,
                            delegated_node_id,
                            RegistryVersion::from(1),
                        )
                        .unwrap(),
                };

                canister_http_pool.insert(UnvalidatedArtifact {
                    message: CanisterHttpResponseArtifact {
                        share,
                        response: Some(response),
                    },
                    peer_id: delegated_node_id,
                    timestamp: UNIX_EPOCH,
                });

                // 4. VALIDATE: Call validate_shares and check the result.
                let changes = pool_manager.validate_shares(
                    pool.get_cache().as_ref(),
                    &canister_http_pool,
                    Height::from(0),
                );

                // 5. ASSERT: The artifact should be successfully validated and moved to the validated pool.
                assert_eq!(changes.len(), 1);
                assert_matches!(&changes[0], CanisterHttpChangeAction::MoveToValidated(_));
            })
        });
    }

    #[test]
    fn test_oversized_reject_message_is_pruned_not_invalidated() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                // 1. SETUP: Standard dependencies.
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 4);

                let callback_id = CallbackId::from(5);
                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);

                // 2. CONTEXT: Set up a NonReplicated request context in the state manager.
                // This ensures we test the gossiping code path.
                let request_context = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };
                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            callback_id,
                            request_context,
                        )]))),
                    ));

                // 3. OVERSIZED RESPONSE: Define an error message that is intentionally too large.
                let oversized_len = MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES + 100;
                let max_len = MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES;
                let oversized_message = "a".repeat(oversized_len);

                let oversized_response = CanisterHttpResponse {
                    id: callback_id,
                    content: CanisterHttpResponseContent::Reject(CanisterHttpReject {
                        reject_code: RejectCode::SysFatal,
                        message: oversized_message,
                    }),
                    ..empty_canister_http_response(callback_id.get())
                };

                // 4. MOCK ADAPTER: Mock the adapter to return the oversized response once.
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                let mut sequence = Sequence::new();
                shim_mock
                    .expect_try_receive()
                    .times(1)
                    .returning(move || Ok(oversized_response.clone()))
                    .in_sequence(&mut sequence);
                shim_mock
                    .expect_try_receive()
                    .times(1)
                    .returning(|| Err(TryReceiveError::Empty))
                    .in_sequence(&mut sequence);

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // 5. ACTION: Call the function to generate shares from responses.
                let change_set = pool_manager.create_shares_from_responses(Height::from(1));

                // 6. ASSERTIONS:
                assert_eq!(
                    change_set.len(),
                    1,
                    "A change action should have been created"
                );

                // Check that the action contains the *pruned* response and a share with the correct hash.
                assert_matches!(
                    &change_set[0],
                    CanisterHttpChangeAction::AddToValidatedAndGossipResponse(share, response) => {
                        // Assert that the response message was indeed truncated.
                        let pruned_message_len = if let CanisterHttpResponseContent::Reject(r) = &response.content {
                            r.message.len()
                        } else {
                            panic!("Test failed: Expected a Reject response content");
                        };
                        assert_eq!(
                            pruned_message_len, max_len,
                            "The reject message should have been truncated to the maximum allowed length"
                        );

                        // Assert that the hash in the share matches the hash of the *truncated* response.
                        let expected_hash = ic_types::crypto::crypto_hash(response);
                        assert_eq!(
                            share.content.content_hash, expected_hash,
                            "The share's content hash must match the pruned response"
                        );
                    }
                );
            })
        });
    }

    #[test]
    fn test_oversized_reject_with_multibyte_char_is_pruned_safely() {
        // This test ensures that when pruning an oversized reject message,
        // the logic correctly handles multi-byte UTF-8 characters that
        // straddle the byte limit, preventing a panic that a simple `truncate()` would cause.
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                // 1. SETUP: Standard dependencies.
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 4);

                let callback_id = CallbackId::from(0); // Use ID that will pass validation filter
                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);

                // 2. CONTEXT: Set up a NonReplicated request context.
                let request_context = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };
                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            callback_id,
                            request_context,
                        )]))),
                    ));

                // 3. MALICIOUS RESPONSE: Construct a string where a 4-byte emoji ('👍')
                // starts 2 bytes before the limit, causing it to cross the boundary.
                let max_len = MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES;
                let padding = "a".repeat(max_len - 2);
                let emoji = "👍"; // A 4-byte character
                let oversized_message = format!("{}{}", padding, emoji);

                // Verify the setup: the message is oversized and the emoji crosses the boundary.
                assert!(oversized_message.len() > max_len);
                assert_eq!(padding.len(), max_len - 2);

                let oversized_response = CanisterHttpResponse {
                    id: callback_id,
                    content: CanisterHttpResponseContent::Reject(CanisterHttpReject {
                        reject_code: RejectCode::SysFatal,
                        message: oversized_message,
                    }),
                    ..empty_canister_http_response(callback_id.get())
                };

                // 4. MOCK ADAPTER: Mock the adapter to return this specific response.
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .times(1)
                    .return_once(move || Ok(oversized_response.clone()));
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // 5. ACTION: Call the function. The test will fail with a panic if
                // the pruning logic is incorrect (i.e., not UTF-8 aware).
                let change_set = pool_manager.create_shares_from_responses(Height::from(1));

                // 6. ASSERTIONS:
                assert_eq!(
                    change_set.len(),
                    1,
                    "A change action should have been created"
                );

                assert_matches!(
                    &change_set[0],
                    CanisterHttpChangeAction::AddToValidatedAndGossipResponse(share, response) => {
                        let pruned_message = if let CanisterHttpResponseContent::Reject(r) = &response.content {
                            &r.message
                        } else {
                            panic!("Test failed: Expected a Reject response content");
                        };

                        // Assert that the pruned message is now within the byte limit.
                        assert!(
                            pruned_message.len() <= max_len,
                            "The pruned message (len: {}) should not exceed the max length ({})",
                            pruned_message.len(), max_len
                        );

                        // Assert the emoji is still present because ellipsize preserves the end of the string.
                        assert!(
                            pruned_message.contains(emoji),
                            "Pruned message should still contain the emoji from the suffix"
                        );

                        // Assert that the hash in the share matches the hash of the now-pruned response.
                        let expected_hash = ic_types::crypto::crypto_hash(response);
                        assert_eq!(
                            share.content.content_hash, expected_hash,
                            "The share's content hash must match the pruned response"
                        );
                    }
                );
            })
        });
    }

    #[test]
    fn test_dishonest_oversized_reject_is_invalidated() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                // 1. SETUP: Standard dependencies.
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 4);

                // This is the ID of the dishonest replica sending the artifact
                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);
                let callback_id = CallbackId::from(0);

                // 2. CONTEXT: A valid request context must exist for validation to proceed.
                let request_context = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };
                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            callback_id,
                            request_context,
                        )]))),
                    ));

                // 3. DISHONEST ARTIFACT:
                let oversized_len = MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES + 1;
                let dishonest_response = CanisterHttpResponse {
                    id: callback_id,
                    content: CanisterHttpResponseContent::Reject(CanisterHttpReject {
                        reject_code: RejectCode::SysFatal,
                        message: "b".repeat(oversized_len),
                    }),
                    ..empty_canister_http_response(callback_id.get())
                };

                let dishonest_hash = ic_types::crypto::crypto_hash(&dishonest_response);
                let response_metadata = CanisterHttpResponseMetadata {
                    id: callback_id,
                    timeout: dishonest_response.timeout,
                    registry_version: RegistryVersion::from(1),
                    content_hash: dishonest_hash,
                    replica_version: ReplicaVersion::default(),
                };
                let share = Signed {
                    content: response_metadata.clone(),
                    signature: crypto
                        .sign(
                            &response_metadata,
                            delegated_node_id,
                            RegistryVersion::from(1),
                        )
                        .unwrap(),
                };

                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());
                canister_http_pool.insert(UnvalidatedArtifact {
                    message: CanisterHttpResponseArtifact {
                        share,
                        response: Some(dishonest_response.clone()),
                    },
                    peer_id: delegated_node_id,
                    timestamp: UNIX_EPOCH,
                });

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager,
                    Arc::new(Mutex::new(Box::new(MockNonBlockingChannel::new()))),
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // 4. ACTION: Our replica attempts to validate the artifact.
                let change_set = pool_manager.validate_shares(
                    pool.get_cache().as_ref(),
                    &canister_http_pool,
                    Height::from(1),
                );

                // 5. ASSERTION: The artifact is now correctly invalidated by the validate_response_size check.
                assert_eq!(change_set.len(), 1, "Expected exactly one change action");

                let expected_error = format!(
                    "Http Response for request ID {} is too large: Reject message size {} exceeds the maximum allowed size of {}",
                    callback_id, oversized_len, MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES
                );
                assert_matches!(
                    &change_set[0],
                    CanisterHttpChangeAction::HandleInvalid(_, reason) => {
                        assert_eq!(reason, &expected_error);
                    }
                );
            })
        });
    }

    #[test]
    fn test_reject_message_is_valid_when_context_limit_is_too_low() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                // This value is intentionally lower than the reject message size.
                const LOW_MAX_RESPONSE_BYTES: u64 = 10;

                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);

                // 1. Set up a state context with a very low max_response_bytes limit.
                let request_context = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: Some(NumBytes::from(LOW_MAX_RESPONSE_BYTES)),
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(0),
                            request_context,
                        )]))),
                    ));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager.clone(),
                    Arc::new(Mutex::new(Box::new(shim_mock))),
                    crypto.clone(),
                    pool.get_cache(),
                    replica_config.clone(),
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // 2. Create a reject message that is larger than the low limit, but smaller
                //    than the minimum floor.
                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());

                let reject_message =
                    "This error message is definitely longer than 10 bytes.".to_string();
                assert!(reject_message.len() as u64 > LOW_MAX_RESPONSE_BYTES);
                assert!(reject_message.len() <= MAXIMUM_ALLOWED_ERROR_MESSAGE_BYTES);

                let reject_content = CanisterHttpReject {
                    reject_code: RejectCode::SysFatal,
                    message: reject_message,
                };

                let response = CanisterHttpResponse {
                    id: CallbackId::from(0),
                    content: CanisterHttpResponseContent::Reject(reject_content),
                    ..empty_canister_http_response(0)
                };

                let response_metadata = CanisterHttpResponseMetadata {
                    id: CallbackId::from(0),
                    timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                    registry_version: RegistryVersion::from(1),
                    content_hash: ic_types::crypto::crypto_hash(&response),
                    replica_version: ReplicaVersion::default(),
                };

                let share = Signed {
                    content: response_metadata.clone(),
                    signature: crypto
                        .sign(
                            &response_metadata,
                            delegated_node_id,
                            RegistryVersion::from(1),
                        )
                        .unwrap(),
                };

                canister_http_pool.insert(UnvalidatedArtifact {
                    message: CanisterHttpResponseArtifact {
                        share,
                        response: Some(response),
                    },
                    peer_id: delegated_node_id,
                    timestamp: UNIX_EPOCH,
                });

                // 3. Call validate_shares and assert that the share is considered VALID.
                let changes = pool_manager.validate_shares(
                    pool.get_cache().as_ref(),
                    &canister_http_pool,
                    Height::from(0),
                );

                assert_matches!(&changes[0], CanisterHttpChangeAction::MoveToValidated(_));
            })
        });
    }

    #[test]
    pub fn test_already_created_shares_not_re_requested() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 5);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let request = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::FullyReplicated,
                    pricing_version: PricingVersion::Legacy,
                };

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(7),
                            request,
                        )]))),
                    ));

                let response_metadata = CanisterHttpResponseMetadata {
                    id: CallbackId::from(7),
                    timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                    registry_version: RegistryVersion::from(1),
                    content_hash: CryptoHashOf::new(CryptoHash(vec![])),
                    replica_version: ReplicaVersion::default(),
                };

                let signature = crypto
                    .sign(
                        &response_metadata,
                        replica_config.node_id,
                        RegistryVersion::from(1),
                    )
                    .unwrap();

                let content = empty_canister_http_response(7);
                let share = Signed {
                    content: response_metadata,
                    signature,
                };

                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());
                canister_http_pool.apply(vec![CanisterHttpChangeAction::AddToValidated(
                    share, content,
                )]);
                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager as Arc<_>,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // Because we already have a share in the pool, we should be
                // able to call on_state_change again without send being called.
                // We haven't sent an expectation on send, so this will fail if
                // send is, in fact called.
                pool_manager.generate_change_set(&canister_http_pool);
            })
        });
    }

    #[test]
    pub fn test_create_shares() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 4);

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([]))),
                    ));

                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();

                let mut sequence = Sequence::new();
                for i in 3..5 {
                    shim_mock
                        .expect_try_receive()
                        .times(1)
                        .returning(move || Ok(empty_canister_http_response(i)))
                        .in_sequence(&mut sequence);
                }

                shim_mock
                    .expect_try_receive()
                    .times(1)
                    .returning(|| Err(TryReceiveError::Empty))
                    .in_sequence(&mut sequence);

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());
                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config,
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );
                let change_set = pool_manager.generate_change_set(&canister_http_pool);
                assert_eq!(change_set.len(), 2);
            });
        });
    }

    #[test]
    pub fn test_non_replicated_response_is_gossiped() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 4);

                let delegated_node_id = ic_test_utilities_types::ids::node_test_id(1);
                let callback_id = CallbackId::from(5);

                // 1. Set up the state to contain a non-replicated request context.
                let request_context = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::NonReplicated(delegated_node_id),
                    pricing_version: PricingVersion::Legacy,
                };
                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            callback_id,
                            request_context,
                        )]))),
                    ));

                // 2. Mock the adapter shim to return a response matching the non-replicated request.
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                let mut sequence = Sequence::new();
                shim_mock
                    .expect_try_receive()
                    .times(1)
                    .returning(move || Ok(empty_canister_http_response(callback_id.get())))
                    .in_sequence(&mut sequence);
                shim_mock
                    .expect_try_receive()
                    .times(1)
                    .returning(|| Err(TryReceiveError::Empty))
                    .in_sequence(&mut sequence);

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager,
                    shim,
                    crypto,
                    pool.get_cache(),
                    replica_config.clone(),
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );

                // 3. Call the function and get the change set.
                let change_set = pool_manager.create_shares_from_responses(Height::from(1));

                // 4. Assert that the correct change action for gossiping the response was produced.
                assert_eq!(change_set.len(), 1);

                if let CanisterHttpChangeAction::AddToValidatedAndGossipResponse(share, response) =
                    &change_set[0]
                {
                    let expected_response = empty_canister_http_response(callback_id.get());
                    assert_eq!(*response, expected_response);

                    assert_eq!(share.content.id, callback_id);
                    assert_eq!(share.content.timeout, expected_response.timeout);
                    assert_eq!(
                        share.content.content_hash,
                        ic_types::crypto::crypto_hash(&expected_response)
                    );
                    assert_eq!(share.content.registry_version, RegistryVersion::from(1));
                    assert_eq!(share.signature.signer, replica_config.node_id);
                } else {
                    panic!(
                        "Expected CanisterHttpChangeAction::AddToValidatedAndGossipResponse, but got {:?}",
                        change_set[0]
                    );
                }
            });
        });
    }

    #[test]
    pub fn test_submit_requests() {
        ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| {
            with_test_replica_logger(|log| {
                let Dependencies {
                    pool,
                    replica_config,
                    crypto,
                    state_manager,
                    registry,
                    ..
                } = dependencies(pool_config.clone(), 4);
                let mut shim_mock = MockNonBlockingChannel::<CanisterHttpRequest>::new();
                shim_mock
                    .expect_try_receive()
                    .return_const(Err(TryReceiveError::Empty));

                let request = CanisterHttpRequestContext {
                    request: ic_test_utilities_types::messages::RequestBuilder::new().build(),
                    url: "".to_string(),
                    max_response_bytes: None,
                    headers: vec![],
                    body: None,
                    http_method: CanisterHttpMethod::GET,
                    transform: None,
                    time: ic_types::Time::from_nanos_since_unix_epoch(10),
                    replication: Replication::FullyReplicated,
                    pricing_version: PricingVersion::Legacy,
                };

                // Expect times to be called exactly once to check that already
                // requested cache works.
                shim_mock
                    .expect_send()
                    .with(eq(CanisterHttpRequest {
                        id: CallbackId::from(7),
                        timeout: ic_types::Time::from_nanos_since_unix_epoch(10)
                            + Duration::from_secs(60 * 5),
                        context: request.clone(),
                        socks_proxy_addrs: vec![],
                    }))
                    .times(1)
                    .return_const(Ok(()));

                let shim: Arc<Mutex<CanisterHttpAdapterClient>> =
                    Arc::new(Mutex::new(Box::new(shim_mock)));

                state_manager
                    .get_mut()
                    .expect_get_latest_state()
                    .return_const(Labeled::new(
                        Height::from(1),
                        Arc::new(state_with_pending_http_calls(BTreeMap::from([(
                            CallbackId::from(7),
                            request,
                        )]))),
                    ));

                let pool_manager = CanisterHttpPoolManagerImpl::new(
                    state_manager,
                    shim,
                    crypto.clone(),
                    pool.get_cache(),
                    replica_config.clone(),
                    SubnetType::Application,
                    Arc::clone(&registry) as Arc<_>,
                    MetricsRegistry::new(),
                    log,
                );
                let mut canister_http_pool =
                    CanisterHttpPoolImpl::new(MetricsRegistry::new(), no_op_logger());
                let change_set = pool_manager.generate_change_set(&canister_http_pool);
                assert_eq!(change_set.len(), 0);

                let response_metadata = CanisterHttpResponseMetadata {
                    id: CallbackId::from(7),
                    timeout: ic_types::Time::from_nanos_since_unix_epoch(10),
                    registry_version: RegistryVersion::from(1),
                    content_hash: CryptoHashOf::new(CryptoHash(vec![])),
                    replica_version: ReplicaVersion::default(),
                };

                let signature = crypto
                    .sign(
                        &response_metadata,
                        replica_config.node_id,
                        RegistryVersion::from(1),
                    )
                    .unwrap();

                let content = empty_canister_http_response(7);
                let share = Signed {
                    content: response_metadata,
                    signature,
                };

                canister_http_pool.apply(vec![CanisterHttpChangeAction::AddToValidated(
                    share, content,
                )]);

                // Now that there are shares in the pool, we should be able to
                // call generate_change_set again without send being called.
                pool_manager.generate_change_set(&canister_http_pool);
            });
        });
    }
}
